<!DOCTYPE html>
<html lang="zh-cn">
<head>
  <title>多进程模型和进程间通讯 - 为企业级框架和应用而生</title>
  <meta charset="utf-8">
  <meta name="description" content="index.description">
  <meta http-equiv="X-UA-Compatible" content="IE=Edge,chrome=1">
  <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no">
  <link rel="icon" href="/images/favicon.png" type="image/x-icon">
  <link rel="stylesheet" href="https://cdn.jsdelivr.net/docsearch.js/2/docsearch.min.css" />
<link rel="stylesheet" href="/css/index.css">

    <script>
    !function(t,e,a,r,c){t.TracertCmdCache=t.TracertCmdCache||[],t[c]=window[c]||
      {_isInit:!0,call:function(){t.TracertCmdCache.push(arguments)},
      start:function(t){this.call('start',t)}},t[c].l=new Date;
      var n=e.createElement(a),s=e.getElementsByTagName(a)[0];
      n.async=!0,n.src=r,s.parentNode.insertBefore(n,s)}
    (window,document,'script','https://tracert.alipay.com/tracert.js','Tracert');
      Tracert.start({
        plugins: [ 'BucName' ],
        spmAPos: 'a454',
        spmBPos: 'b4893',
      });
    </script>
  
</head>
<body>
  <div class="nav" >
  <header>
    <a href="/zh-cn/" class="nav-logo leftpadding" alt="egg"><img src="https://zos.alipayobjects.com/rmsportal/VTcUYAaoKqXyHJbLAPyF.svg"></a>
    <ul class="nav-item">
      <li>
        <form id="search-form">
          <input type="text" id="search-query" class="search-query st-default-search-input">
        </form>
      </li>
      <li><a href="/zh-cn/intro/" alt="指南">指南</a></li><li><a href="/api/" alt="API">API</a></li><li><a href="/zh-cn/tutorials/index.html" alt="教程">教程</a></li><li><a href="https://github.com/search?q=topic%3Aegg-plugin&type=Repositories" alt="插件">插件</a></li><li><a href="https://github.com/eggjs/egg/releases" alt="发布日志">发布日志</a></li>
      
      
        <li class="translations">
          <a class="nav-link">Translations</a>
          <span class="arrow"></span><ul id="dropdownContent" class="dropdown-content"><li><a id="en" href="/en/core/cluster-and-ipc.html" >English</a></li><li><a id="zh-cn" href="/zh-cn/core/cluster-and-ipc.html" style="color: #22ab28">中文</a></li></ul>
        </li>
      
      <li><iframe src="https://ghbtns.com/github-btn.html?user=eggjs&repo=egg&type=star&count=true" frameborder="0" scrolling="0" width="150px" height="20px"></iframe></li>
    </ul>
    <a id="mobileTrigger" href="#" class="mobile-trigger">
      <ul>
        <li></li>
        <li></li>
        <li></li>
      </ul>
    </a>
  </header>
</div>
  <div id="container" class="container">
    <div class="page-main">
  <article class="markdown-body">
    <h1>多进程模型和进程间通讯</h1>
    <p>我们知道 JavaScript 代码是运行在单线程上的，换句话说一个 Node.js 进程只能运行在一个 CPU 上。那么如果用 Node.js 来做 Web Server，就无法享受到多核运算的好处。作为企业级的解决方案，我们要解决的一个问题就是:</p>
<blockquote>
<p>如何榨干服务器资源，利用上多核 CPU 的并发优势？</p>
</blockquote>
<p>而 Node.js 官方提供的解决方案是 <a href="https://nodejs.org/api/cluster.html" target="_blank" rel="noopener">Cluster 模块</a></p>
<blockquote>
<p>A single instance of Node.js runs in a single thread. To take advantage of multi-core systems the user will sometimes want to launch a cluster of Node.js processes to handle the load.</p>
</blockquote>
<blockquote>
<p>The cluster module allows you to easily create child processes that all share server ports.</p>
</blockquote>
<h2 id="cluster-是什么呢"><a class="markdown-anchor" href="#cluster-是什么呢">#</a> Cluster 是什么呢？</h2>
<p>简单的说，</p>
<ul>
<li>在服务器上同时启动多个进程。</li>
<li>每个进程里都跑的是同一份源代码（好比把以前一个进程的工作分给多个进程去做）。</li>
<li>更神奇的是，这些进程可以同时监听一个端口（具体原理推荐阅读 @DavidCai1993 这篇 <a href="https://cnodejs.org/topic/56e84480833b7c8a0492e20c" target="_blank" rel="noopener">Cluster 实现原理</a>）。</li>
</ul>
<p>其中：</p>
<ul>
<li>负责启动其他进程的叫做 Master 进程，他好比是个『包工头』，不做具体的工作，只负责启动其他进程。</li>
<li>其他被启动的叫 Worker 进程，顾名思义就是干活的『工人』。它们接收请求，对外提供服务。</li>
<li>Worker 进程的数量一般根据服务器的 CPU 核数来定，这样就可以完美利用多核资源。</li>
</ul>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="keyword">const</span> cluster = <span class="built_in">require</span>(<span class="string">'cluster'</span>);</span><br><span class="line"><span class="keyword">const</span> http = <span class="built_in">require</span>(<span class="string">'http'</span>);</span><br><span class="line"><span class="keyword">const</span> numCPUs = <span class="built_in">require</span>(<span class="string">'os'</span>).cpus().length;</span><br><span class="line"></span><br><span class="line"><span class="keyword">if</span> (cluster.isMaster) &#123;</span><br><span class="line">  <span class="comment">// Fork workers.</span></span><br><span class="line">  <span class="keyword">for</span> (<span class="keyword">let</span> i = <span class="number">0</span>; i &lt; numCPUs; i++) &#123;</span><br><span class="line">    cluster.fork();</span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  cluster.on(<span class="string">'exit'</span>, <span class="function"><span class="keyword">function</span>(<span class="params">worker, code, signal</span>) </span>&#123;</span><br><span class="line">    <span class="built_in">console</span>.log(<span class="string">'worker '</span> + worker.process.pid + <span class="string">' died'</span>);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;</span><br><span class="line">  <span class="comment">// Workers can share any TCP connection</span></span><br><span class="line">  <span class="comment">// In this case it is an HTTP server</span></span><br><span class="line">  http.createServer(<span class="function"><span class="keyword">function</span>(<span class="params">req, res</span>) </span>&#123;</span><br><span class="line">    res.writeHead(<span class="number">200</span>);</span><br><span class="line">    res.end(<span class="string">"hello world\n"</span>);</span><br><span class="line">  &#125;).listen(<span class="number">8000</span>);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h2 id="框架的多进程模型"><a class="markdown-anchor" href="#框架的多进程模型">#</a> 框架的多进程模型</h2>
<p>上面的示例是不是很简单，但是作为企业级的解决方案，要考虑的东西还有很多。</p>
<ul>
<li>Worker 进程异常退出以后该如何处理？</li>
<li>多个 Worker 进程之间如何共享资源？</li>
<li>多个 Worker 进程之间如何调度？</li>
<li>...</li>
</ul>
<h3 id="进程守护"><a class="markdown-anchor" href="#进程守护">#</a> 进程守护</h3>
<p>健壮性（又叫鲁棒性）是企业级应用必须考虑的问题，除了程序本身代码质量要保证，框架层面也需要提供相应的『兜底』机制保证极端情况下应用的可用性。</p>
<p>一般来说，Node.js 进程退出可以分为两类：</p>
<h4 id="未捕获异常"><a class="markdown-anchor" href="#未捕获异常">#</a> 未捕获异常</h4>
<p>当代码抛出了异常没有被捕获到时，进程将会退出，此时 Node.js 提供了 <code>process.on('uncaughtException', handler)</code> 接口来捕获它，但是当一个 Worker 进程遇到 <a href="https://nodejs.org/dist/latest-v6.x/docs/api/process.html#process_event_uncaughtexception" target="_blank" rel="noopener">未捕获的异常</a> 时，它已经处于一个不确定状态，此时我们应该让这个进程优雅退出：</p>
<ol>
<li>关闭异常 Worker 进程所有的 TCP Server（将已有的连接快速断开，且不再接收新的连接），断开和 Master 的 IPC 通道，不再接受新的用户请求。</li>
<li>Master 立刻 fork 一个新的 Worker 进程，保证在线的『工人』总数不变。</li>
<li>异常 Worker 等待一段时间，处理完已经接受的请求后退出。</li>
</ol>
<figure class="highlight bash"><table><tr><td class="code"><pre><span class="line">+---------+                 +---------+</span><br><span class="line">|  Worker |                 |  Master |</span><br><span class="line">+---------+                 +----+----+</span><br><span class="line">     | uncaughtException         |</span><br><span class="line">     +------------+              |</span><br><span class="line">     |            |              |                   +---------+</span><br><span class="line">     | &lt;----------+              |                   |  Worker |</span><br><span class="line">     |                           |                   +----+----+</span><br><span class="line">     |        disconnect         |   fork a new worker    |</span><br><span class="line">     +-------------------------&gt; + ---------------------&gt; |</span><br><span class="line">     |         <span class="built_in">wait</span>...           |                        |</span><br><span class="line">     |          <span class="built_in">exit</span>             |                        |</span><br><span class="line">     +-------------------------&gt; |                        |</span><br><span class="line">     |                           |                        |</span><br><span class="line">    die                          |                        |</span><br><span class="line">                                 |                        |</span><br><span class="line">                                 |                        |</span><br></pre></td></tr></table></figure>
<h4 id="oom-系统异常"><a class="markdown-anchor" href="#oom-系统异常">#</a> OOM、系统异常</h4>
<p>而当一个进程出现异常导致 crash 或者 OOM 被系统杀死时，不像未捕获异常发生时我们还有机会让进程继续执行，只能够让当前进程直接退出，Master 立刻 fork 一个新的 Worker。</p>
<p>在框架里，我们采用 <a href="https://github.com/node-modules/graceful" target="_blank" rel="noopener">graceful</a> 和 <a href="https://github.com/eggjs/egg-cluster" target="_blank" rel="noopener">egg-cluster</a> 两个模块配合实现上面的逻辑。这套方案已在阿里巴巴和蚂蚁金服的生产环境广泛部署，且经受过『双11』大促的考验，所以是相对稳定和靠谱的。</p>
<h3 id="agent-机制"><a class="markdown-anchor" href="#agent-机制">#</a> Agent 机制</h3>
<p>说到这里，Node.js 多进程方案貌似已经成型，这也是我们早期线上使用的方案。但后来我们发现有些工作其实不需要每个 Worker 都去做，如果都做，一来是浪费资源，更重要的是可能会导致多进程间资源访问冲突。举个例子：生产环境的日志文件我们一般会按照日期进行归档，在单进程模型下这再简单不过了：</p>
<blockquote>
<ol>
<li>每天凌晨 0 点，将当前日志文件按照日期进行重命名</li>
<li>销毁以前的文件句柄，并创建新的日志文件继续写入</li>
</ol>
</blockquote>
<p>试想如果现在是 4 个进程来做同样的事情，是不是就乱套了。所以，对于这一类后台运行的逻辑，我们希望将它们放到一个单独的进程上去执行，这个进程就叫 Agent Worker，简称 Agent。Agent 好比是 Master 给其他 Worker 请的一个『秘书』，它不对外提供服务，只给 App Worker 打工，专门处理一些公共事务。现在我们的多进程模型就变成下面这个样子了</p>
<figure class="highlight bash"><table><tr><td class="code"><pre><span class="line">                +--------+          +-------+</span><br><span class="line">                | Master |&lt;--------&gt;| Agent |</span><br><span class="line">                +--------+          +-------+</span><br><span class="line">                ^   ^    ^</span><br><span class="line">               /    |     \</span><br><span class="line">             /      |       \</span><br><span class="line">           /        |         \</span><br><span class="line">         v          v          v</span><br><span class="line">+----------+   +----------+   +----------+</span><br><span class="line">| Worker 1 |   | Worker 2 |   | Worker 3 |</span><br><span class="line">+----------+   +----------+   +----------+</span><br></pre></td></tr></table></figure>
<p>那我们框架的启动时序如下：</p>
<figure class="highlight bash"><table><tr><td class="code"><pre><span class="line">+---------+           +---------+          +---------+</span><br><span class="line">|  Master |           |  Agent  |          |  Worker |</span><br><span class="line">+---------+           +----+----+          +----+----+</span><br><span class="line">     |      fork agent     |                    |</span><br><span class="line">     +--------------------&gt;|                    |</span><br><span class="line">     |      agent ready    |                    |</span><br><span class="line">     |&lt;--------------------+                    |</span><br><span class="line">     |                     |     fork worker    |</span><br><span class="line">     +-----------------------------------------&gt;|</span><br><span class="line">     |     worker ready    |                    |</span><br><span class="line">     |&lt;-----------------------------------------+</span><br><span class="line">     |      Egg ready      |                    |</span><br><span class="line">     +--------------------&gt;|                    |</span><br><span class="line">     |      Egg ready      |                    |</span><br><span class="line">     +-----------------------------------------&gt;|</span><br></pre></td></tr></table></figure>
<ol>
<li>Master 启动后先 fork Agent 进程</li>
<li>Agent 初始化成功后，通过 IPC 通道通知 Master</li>
<li>Master 再 fork 多个 App Worker</li>
<li>App Worker 初始化成功，通知 Master</li>
<li>所有的进程初始化成功后，Master 通知 Agent 和 Worker 应用启动成功</li>
</ol>
<p>另外，关于 Agent Worker 还有几点需要注意的是：</p>
<ol>
<li>由于 App Worker 依赖于 Agent，所以必须等 Agent 初始化完成后才能 fork App Worker</li>
<li>Agent 虽然是 App Worker 的『小秘』，但是业务相关的工作不应该放到 Agent 上去做，不然把她累垮了就不好了</li>
<li>由于 Agent 的特殊定位，<strong>我们应该保证它相对稳定</strong>。当它发生未捕获异常，框架不会像 App Worker 一样让他退出重启，而是记录异常日志、报警等待人工处理</li>
<li>Agent 和普通 App Worker 挂载的 API 不完全一样，如何识别差异可查看<a href="../advanced/framework.html">框架文档</a></li>
</ol>
<h3 id="agent-的用法"><a class="markdown-anchor" href="#agent-的用法">#</a> Agent 的用法</h3>
<p>你可以在应用或插件根目录下的 <code>agent.js</code> 中实现你自己的逻辑（和<a href="../basics/app-start.html">启动自定义</a> 用法类似，只是入口参数是 agent 对象）</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// agent.js</span></span><br><span class="line"><span class="built_in">module</span>.exports = <span class="function"><span class="params">agent</span> =&gt;</span> &#123;</span><br><span class="line">  <span class="comment">// 在这里写你的初始化逻辑</span></span><br><span class="line"></span><br><span class="line">  <span class="comment">// 也可以通过 messenger 对象发送消息给 App Worker</span></span><br><span class="line">  <span class="comment">// 但需要等待 App Worker 启动成功后才能发送，不然很可能丢失</span></span><br><span class="line">  agent.messenger.on(<span class="string">'egg-ready'</span>, () =&gt; &#123;</span><br><span class="line">    <span class="keyword">const</span> data = &#123; ... &#125;;</span><br><span class="line">    agent.messenger.sendToApp(<span class="string">'xxx_action'</span>, data);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app.js</span></span><br><span class="line"><span class="built_in">module</span>.exports = <span class="function"><span class="params">app</span> =&gt;</span> &#123;</span><br><span class="line">  app.messenger.on(<span class="string">'xxx_action'</span>, data =&gt; &#123;</span><br><span class="line">    <span class="comment">// ...</span></span><br><span class="line">  &#125;);</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<p>这个例子中，<code>agent.js</code> 的代码会执行在 agent 进程上，<code>app.js</code> 的代码会执行在 Worker 进程上，他们通过框架封装的 <code>messenger</code> 对象进行进程间通讯（IPC），后面的章节会对框架的 IPC 做详细的讲解。</p>
<h3 id="master-vs-agent-vs-worker"><a class="markdown-anchor" href="#master-vs-agent-vs-worker">#</a> Master VS Agent VS Worker</h3>
<p>当一个应用启动时，会同时启动这三类进程。</p>
<table>
<thead>
<tr>
<th>类型</th>
<th>进程数量</th>
<th>作用</th>
<th>稳定性</th>
<th>是否运行业务代码</th>
</tr>
</thead>
<tbody>
<tr>
<td>Master</td>
<td>1</td>
<td>进程管理，进程间消息转发</td>
<td>非常高</td>
<td>否</td>
</tr>
<tr>
<td>Agent</td>
<td>1</td>
<td>后台运行工作（长连接客户端）</td>
<td>高</td>
<td>少量</td>
</tr>
<tr>
<td>Worker</td>
<td>一般设置为 CPU 核数</td>
<td>执行业务代码</td>
<td>一般</td>
<td>是</td>
</tr>
</tbody>
</table>
<h4 id="master"><a class="markdown-anchor" href="#master">#</a> Master</h4>
<p>在这个模型下，Master 进程承担了进程管理的工作（类似 <a href="https://github.com/Unitech/pm2" target="_blank" rel="noopener">pm2</a>），不运行任何业务代码，我们只需要运行起一个 Master 进程它就会帮我们搞定所有的 Worker、Agent 进程的初始化以及重启等工作了。</p>
<p>Master 进程的稳定性是极高的，线上运行时我们只需要通过 <a href="https://github.com/eggjs/egg-scripts" target="_blank" rel="noopener">egg-scripts</a> 后台运行通过 <code>egg.startCluster</code> 启动的 Master 进程就可以了，不再需要使用 <a href="https://github.com/Unitech/pm2" target="_blank" rel="noopener">pm2</a> 等进程守护模块。</p>
<figure class="highlight bash"><table><tr><td class="code"><pre><span class="line">$ egg-scripts start --daemon</span><br></pre></td></tr></table></figure>
<h4 id="agent"><a class="markdown-anchor" href="#agent">#</a> Agent</h4>
<p>在大部分情况下，我们在写业务代码的时候完全不用考虑 Agent 进程的存在，但是当我们遇到一些场景，只想让代码运行在一个进程上的时候，Agent 进程就到了发挥作用的时候了。</p>
<p>由于 Agent 只有一个，而且会负责许多维持连接的脏活累活，因此它不能轻易挂掉和重启，所以 Agent 进程在监听到未捕获异常时不会退出，但是会打印出错误日志，<strong>我们需要对日志中的未捕获异常提高警惕</strong>。</p>
<h4 id="worker"><a class="markdown-anchor" href="#worker">#</a> Worker</h4>
<p>Worker 进程负责处理真正的用户请求和<a href="../basics/schedule.html">定时任务</a>的处理。而 Egg 的定时任务也提供了只让一个 Worker 进程运行的能力，<strong>所以能够通过定时任务解决的问题就不要放到 Agent 上执行</strong>。</p>
<p>Worker 运行的是业务代码，相对会比 Agent 和 Master 进程上运行的代码复杂度更高，稳定性也低一点，<strong>当 Worker 进程异常退出时，Master 进程会重启一个 Worker 进程。</strong></p>
<h2 id="进程间通讯ipc"><a class="markdown-anchor" href="#进程间通讯ipc">#</a> 进程间通讯（IPC）</h2>
<p>虽然每个 Worker 进程是相对独立的，但是它们之间始终还是需要通讯的，叫进程间通讯（IPC）。下面是 Node.js 官方提供的一段示例代码</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="meta">'use strict'</span>;</span><br><span class="line"><span class="keyword">const</span> cluster = <span class="built_in">require</span>(<span class="string">'cluster'</span>);</span><br><span class="line"></span><br><span class="line"><span class="keyword">if</span> (cluster.isMaster) &#123;</span><br><span class="line">  <span class="keyword">const</span> worker = cluster.fork();</span><br><span class="line">  worker.send(<span class="string">'hi there'</span>);</span><br><span class="line">  worker.on(<span class="string">'message'</span>, msg =&gt; &#123;</span><br><span class="line">    <span class="built_in">console</span>.log(<span class="string">`msg: <span class="subst">$&#123;msg&#125;</span> from worker#<span class="subst">$&#123;worker.id&#125;</span>`</span>);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (cluster.isWorker) &#123;</span><br><span class="line">  process.on(<span class="string">'message'</span>, (msg) =&gt; &#123;</span><br><span class="line">    process.send(msg);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>细心的你可能已经发现 cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间，Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通讯该怎么办呢？是的，通过 Master 来转发。</p>
<figure class="highlight bash"><table><tr><td class="code"><pre><span class="line">广播消息： agent =&gt; all workers</span><br><span class="line">                  +--------+          +-------+</span><br><span class="line">                  | Master |&lt;---------| Agent |</span><br><span class="line">                  +--------+          +-------+</span><br><span class="line">                 /    |     \</span><br><span class="line">                /     |      \</span><br><span class="line">               /      |       \</span><br><span class="line">              /       |        \</span><br><span class="line">             v        v         v</span><br><span class="line">  +----------+   +----------+   +----------+</span><br><span class="line">  | Worker 1 |   | Worker 2 |   | Worker 3 |</span><br><span class="line">  +----------+   +----------+   +----------+</span><br><span class="line"></span><br><span class="line">指定接收方： one worker =&gt; another worker</span><br><span class="line">                  +--------+          +-------+</span><br><span class="line">                  | Master |----------| Agent |</span><br><span class="line">                  +--------+          +-------+</span><br><span class="line">                 ^    |</span><br><span class="line">     send to    /     |</span><br><span class="line">    worker 2   /      |</span><br><span class="line">              /       |</span><br><span class="line">             /        v</span><br><span class="line">  +----------+   +----------+   +----------+</span><br><span class="line">  | Worker 1 |   | Worker 2 |   | Worker 3 |</span><br><span class="line">  +----------+   +----------+   +----------+</span><br></pre></td></tr></table></figure>
<p>为了方便调用，我们封装了一个 messenger 对象挂在 app / agent 实例上，提供一系列友好的 API。</p>
<h3 id="发送"><a class="markdown-anchor" href="#发送">#</a> 发送</h3>
<ul>
<li><code>app.messenger.broadcast(action, data)</code>：发送给所有的 agent / app 进程（包括自己）</li>
<li><code>app.messenger.sendToApp(action, data)</code>: 发送给所有的 app 进程
<ul>
<li>在 app 上调用该方法会发送给自己和其他的 app 进程</li>
<li>在 agent 上调用该方法会发送给所有的 app 进程</li>
</ul>
</li>
<li><code>app.messenger.sendToAgent(action, data)</code>: 发送给 agent 进程
<ul>
<li>在 app 上调用该方法会发送给 agent 进程</li>
<li>在 agent 上调用该方法会发送给 agent 自己</li>
</ul>
</li>
<li><code>agent.messenger.sendRandom(action, data)</code>:
<ul>
<li>app 上没有该方法（现在 Egg 的实现是等同于 sentToAgent）</li>
<li>agent 会随机发送消息给一个 app 进程（由 master 来控制发送给谁）</li>
</ul>
</li>
<li><code>app.messenger.sendTo(pid, action, data)</code>: 发送给指定进程</li>
</ul>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app.js</span></span><br><span class="line"><span class="built_in">module</span>.exports = <span class="function"><span class="params">app</span> =&gt;</span> &#123;</span><br><span class="line">  <span class="comment">// 注意，只有在 egg-ready 事件拿到之后才能发送消息</span></span><br><span class="line">  app.messenger.once(<span class="string">'egg-ready'</span>, () =&gt; &#123;</span><br><span class="line">    app.messenger.sendToAgent(<span class="string">'agent-event'</span>, &#123; <span class="attr">foo</span>: <span class="string">'bar'</span> &#125;);</span><br><span class="line">    app.messenger.sendToApp(<span class="string">'app-event'</span>, &#123; <span class="attr">foo</span>: <span class="string">'bar'</span> &#125;);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p><em>上面所有 <code>app.messenger</code> 上的方法都可以在 <code>agent.messenger</code> 上使用。</em></p>
<h4 id="egg-ready"><a class="markdown-anchor" href="#egg-ready">#</a> egg-ready</h4>
<p>上面的示例中提到，需要等 <code>egg-ready</code> 消息之后才能发送消息。只有在 Master 确认所有的 Agent 进程和 Worker 进程都已经成功启动（并 ready）之后，才会通过 messenger 发送 <code>egg-ready</code> 消息给所有的 Agent 和 Worker，告知一切准备就绪，IPC 通道可以开始使用了。</p>
<h3 id="接收"><a class="markdown-anchor" href="#接收">#</a> 接收</h3>
<p>在 messenger 上监听对应的 action 事件，就可以收到其他进程发送来的信息了。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line">app.messenger.on(action, data =&gt; &#123;</span><br><span class="line">  <span class="comment">// process data</span></span><br><span class="line">&#125;);</span><br><span class="line">app.messenger.once(action, data =&gt; &#123;</span><br><span class="line">  <span class="comment">// process data</span></span><br><span class="line">&#125;);</span><br></pre></td></tr></table></figure>
<p><em>agent 上的 messenger 接收消息的用法和 app 上一致。</em></p>
<h2 id="ipc-实战"><a class="markdown-anchor" href="#ipc-实战">#</a> IPC 实战</h2>
<p>我们通过一个简单的例子来感受一下在框架的多进程模型下如何使用 IPC 解决实际问题。</p>
<h3 id="需求"><a class="markdown-anchor" href="#需求">#</a> 需求</h3>
<p>我们有一个接口需要从远程数据源中读取一些数据，对外部提供 API，但是这个数据源的数据很少变化，因此我们希望将数据缓存到内存中以提升服务能力，降低 RT。此时就需要有一个更新内存缓存的机制。</p>
<ol>
<li>定时从远程数据源获取数据，更新内存缓存，为了降低对数据源压力，更新的间隔时间会设置的比较长。</li>
<li>远程数据源提供一个检查是否有数据更新的接口，我们的服务可以更频繁的调用检查接口，当有数据更新时才去重新拉取数据。</li>
<li>远程数据源通过消息中间件推送数据更新的消息，我们的服务监听消息来更新数据。</li>
</ol>
<p>在实际项目中，我们可以采用方案一用于兜底，结合方案三或者方案二的一种用于提升数据更新的实时性。而在这个示例中，我们会通过 IPC + <a href="../basics/schedule.html">定时任务</a>来同时实现这三种缓存更新方案。</p>
<h3 id="实现"><a class="markdown-anchor" href="#实现">#</a> 实现</h3>
<p>我们将所有的与远程数据源交互的逻辑封装在一个 Service 中，并提供 <code>get</code> 方法给 Controller 调用。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app/service/source.js</span></span><br><span class="line"><span class="keyword">let</span> memoryCache = &#123;&#125;;</span><br><span class="line"></span><br><span class="line"><span class="class"><span class="keyword">class</span> <span class="title">SourceService</span> <span class="keyword">extends</span> <span class="title">Service</span> </span>&#123;</span><br><span class="line">  get(key) &#123;</span><br><span class="line">    <span class="keyword">return</span> memoryCache[key];</span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  <span class="keyword">async</span> checkUpdate() &#123;</span><br><span class="line">    <span class="comment">// check if remote data source has changed</span></span><br><span class="line">    <span class="keyword">const</span> updated = <span class="keyword">await</span> mockCheck();</span><br><span class="line">    <span class="keyword">this</span>.ctx.logger.info(<span class="string">'check update response %s'</span>, updated);</span><br><span class="line">    <span class="keyword">return</span> updated;</span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  <span class="keyword">async</span> update() &#123;</span><br><span class="line">    <span class="comment">// update memory cache from remote</span></span><br><span class="line">    memoryCache = <span class="keyword">await</span> mockFetch();</span><br><span class="line">    <span class="keyword">this</span>.ctx.logger.info(<span class="string">'update memory cache from remote: %j'</span>, memoryCache);</span><br><span class="line">  &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>编写定时任务，实现方案一，每 10 分钟定时从远程数据源获取数据更新缓存做兜底。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app/schedule/force_refresh.js</span></span><br><span class="line">exports.schedule = &#123;</span><br><span class="line">  interval: <span class="string">'10m'</span>,</span><br><span class="line">  type: <span class="string">'all'</span>, <span class="comment">// run in all workers</span></span><br><span class="line">&#125;;</span><br><span class="line"></span><br><span class="line">exports.task = <span class="keyword">async</span> ctx =&gt; &#123;</span><br><span class="line">  <span class="keyword">await</span> ctx.service.source.update();</span><br><span class="line">  ctx.app.lastUpdateBy = <span class="string">'force'</span>;</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<p>再编写一个定时任务来实现方案二的检查逻辑，每 10s 让一个 worker 调用检查接口，当发现数据有变化时，通过 messenger 提供的方法通知所有的 Worker。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app/schedule/pull_refresh.js</span></span><br><span class="line">exports.schedule = &#123;</span><br><span class="line">  interval: <span class="string">'10s'</span>,</span><br><span class="line">  type: <span class="string">'worker'</span>, <span class="comment">// only run in one worker</span></span><br><span class="line">&#125;;</span><br><span class="line"></span><br><span class="line">exports.task = <span class="keyword">async</span> ctx =&gt; &#123;</span><br><span class="line">  <span class="keyword">const</span> needRefresh = <span class="keyword">await</span> ctx.service.source.checkUpdate();</span><br><span class="line">  <span class="keyword">if</span> (!needRefresh) <span class="keyword">return</span>;</span><br><span class="line"></span><br><span class="line">  <span class="comment">// notify all workers to update memory cache from `file`</span></span><br><span class="line">  ctx.app.messenger.sendToApp(<span class="string">'refresh'</span>, <span class="string">'pull'</span>);</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<p>在启动自定义文件中监听 <code>refresh</code> 事件，并更新数据，所有的 Worker 进程都能收到这个消息，并触发更新，此时我们的方案二也已经大功告成了。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// app.js</span></span><br><span class="line"><span class="built_in">module</span>.exports = <span class="function"><span class="params">app</span> =&gt;</span> &#123;</span><br><span class="line">  app.messenger.on(<span class="string">'refresh'</span>, by =&gt; &#123;</span><br><span class="line">    app.logger.info(<span class="string">'start update by %s'</span>, by);</span><br><span class="line">    <span class="comment">// create an anonymous context to access service</span></span><br><span class="line">    <span class="keyword">const</span> ctx = app.createAnonymousContext();</span><br><span class="line">    ctx.runInBackground(<span class="keyword">async</span> () =&gt; &#123;</span><br><span class="line">      <span class="keyword">await</span> ctx.service.source.update();</span><br><span class="line">      app.lastUpdateBy = by;</span><br><span class="line">    &#125;);</span><br><span class="line">  &#125;);</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<p>现在我们来看看如何实现第三个方案。我们需要有一个消息中间件的客户端，它会和服务端保持长连接，这一类的长连接维持比较适合在 Agent 进程上做，可以有效降低连接数，减少两端的消耗。所以我们在 Agent 进程上来开启消息监听。</p>
<figure class="highlight js"><table><tr><td class="code"><pre><span class="line"><span class="comment">// agent.js</span></span><br><span class="line"></span><br><span class="line"><span class="keyword">const</span> Subscriber = <span class="built_in">require</span>(<span class="string">'./lib/subscriber'</span>);</span><br><span class="line"></span><br><span class="line"><span class="built_in">module</span>.exports = <span class="function"><span class="params">agent</span> =&gt;</span> &#123;</span><br><span class="line">  <span class="keyword">const</span> subscriber = <span class="keyword">new</span> Subscriber();</span><br><span class="line">  <span class="comment">// listen changed event, broadcast to all workers</span></span><br><span class="line">  subscriber.on(<span class="string">'changed'</span>, () =&gt; agent.messenger.sendToApp(<span class="string">'refresh'</span>, <span class="string">'push'</span>));</span><br><span class="line">&#125;;</span><br></pre></td></tr></table></figure>
<p>通过合理使用 Agent 进程、定时任务和 IPC，我们可以轻松搞定类似的需求并降低对数据源的压力。具体的示例代码可以查看 <a href="https://github.com/eggjs/examples/tree/master/ipc" target="_blank" rel="noopener">examples/ipc</a>。</p>
<h2 id="更复杂的场景"><a class="markdown-anchor" href="#更复杂的场景">#</a> 更复杂的场景</h2>
<p>上面的例子中，我们在 Agent 进程上运行了一个 subscriber，来监听消息中间件的消息，如果 Worker 进程也需要监听一些消息怎么办？如何通过 Agent 进程建立连接再转发给 Worker 进程呢？这些问题可以在<a href="../advanced/cluster-client.html">多进程研发模式增强</a>中找到答案。</p>

  </article>
  <aside id="mobileAside" class="toc">
  <div class="mobile-menu">
    <ul>
      <li><a href="/zh-cn/intro/" alt="指南">指南</a></li><li><a href="/api/" alt="API">API</a></li><li><a href="/zh-cn/tutorials/index.html" alt="教程">教程</a></li><li><a href="https://github.com/search?q=topic%3Aegg-plugin&type=Repositories" alt="插件">插件</a></li><li><a href="https://github.com/eggjs/egg/releases" alt="发布日志">发布日志</a></li>
      
      
        <li class="translations">
          <a class="nav-link">Translations</a>
          <span class="arrow"></span><ul id="dropdownContent" class="dropdown-content"><li><a id="en" href="/en/core/cluster-and-ipc.html" >English</a></li><li><a id="zh-cn" href="/zh-cn/core/cluster-and-ipc.html" style="color: #22ab28">中文</a></li></ul>
        </li>
      
    </ul>
  </div>
  <dl><dt id="title-Intro" style="cursor: pointer;" class="aside-title">新手指南<a id="collapse-icon-Intro" class="icon opend"></a></dt><dd id=panel-Intro><ul><li><a href="/zh-cn/intro/index.html" class="menu-link">Egg.js 是什么?</a></li><li><a href="/zh-cn/intro/egg-and-koa.html" class="menu-link">Egg.js 和 Koa</a></li><li><a href="/zh-cn/intro/quickstart.html" class="menu-link">快速入门</a></li><li><a href="/zh-cn/tutorials/progressive.html" class="menu-link">渐进式开发</a></li><li><a href="/zh-cn/migration.html" class="menu-link">2.x 升级指南</a></li></ul></dd><dt id="title-Basics" style="cursor: pointer;" class="aside-title">基础功能<a id="collapse-icon-Basics" class="icon opend"></a></dt><dd id=panel-Basics><ul><li><a href="/zh-cn/basics/structure.html" class="menu-link">目录结构</a></li><li><a href="/zh-cn/basics/objects.html" class="menu-link">内置对象</a></li><li><a href="/zh-cn/basics/env.html" class="menu-link">运行环境</a></li><li><a href="/zh-cn/basics/config.html" class="menu-link">配置</a></li><li><a href="/zh-cn/basics/middleware.html" class="menu-link">中间件</a></li><li><a href="/zh-cn/basics/router.html" class="menu-link">Router</a></li><li><a href="/zh-cn/basics/controller.html" class="menu-link">Controller</a></li><li><a href="/zh-cn/basics/service.html" class="menu-link">Service</a></li><li><a href="/zh-cn/basics/plugin.html" class="menu-link">插件</a></li><li><a href="/zh-cn/basics/schedule.html" class="menu-link">定时任务</a></li><li><a href="/zh-cn/basics/extend.html" class="menu-link">框架扩展</a></li><li><a href="/zh-cn/basics/app-start.html" class="menu-link">启动自定义</a></li></ul></dd><dt id="title-Core" style="cursor: pointer;" class="aside-title">核心功能<a id="collapse-icon-Core" class="icon opend"></a></dt><dd id=panel-Core><ul><li><a href="/zh-cn/core/development.html" class="menu-link">本地开发</a></li><li><a href="/zh-cn/core/unittest.html" class="menu-link">单元测试</a></li><li><a href="/zh-cn/core/deployment.html" class="menu-link">应用部署</a></li><li><a href="/zh-cn/core/logger.html" class="menu-link">日志</a></li><li><a href="/zh-cn/core/httpclient.html" class="menu-link">HttpClient</a></li><li><a href="/zh-cn/core/cookie-and-session.html" class="menu-link">Cookie and Session</a></li><li><a href="/zh-cn/core/cluster-and-ipc.html" class="menu-link">多进程模型和进程间通讯</a></li><li><a href="/zh-cn/core/view.html" class="menu-link">模板渲染</a></li><li><a href="/zh-cn/core/error-handling.html" class="menu-link">异常处理</a></li><li><a href="/zh-cn/core/security.html" class="menu-link">安全</a></li><li><a href="/zh-cn/core/i18n.html" class="menu-link">国际化</a></li></ul></dd><dt id="title-Tutorials" style="cursor: pointer;" class="aside-title">教程<a id="collapse-icon-Tutorials" class="icon opend"></a></dt><dd id=panel-Tutorials><ul><li><a href="/zh-cn/tutorials/mysql.html" class="menu-link">MySQL</a></li><li><a href="/zh-cn/tutorials/restful.html" class="menu-link">RESTful API</a></li><li><a href="/zh-cn/tutorials/passport.html" class="menu-link">Passport 鉴权</a></li><li><a href="/zh-cn/tutorials/socketio.html" class="menu-link">Socket.IO</a></li><li><a href="/zh-cn/tutorials/assets.html" class="menu-link">静态资源</a></li><li><a href="/zh-cn/tutorials/typescript.html" class="menu-link">TypeScript</a></li></ul></dd><dt id="title-Advanced" style="cursor: pointer;" class="aside-title">进阶<a id="collapse-icon-Advanced" class="icon opend"></a></dt><dd id=panel-Advanced><ul><li><a href="/zh-cn/advanced/loader.html" class="menu-link">Loader</a></li><li><a href="/zh-cn/advanced/plugin.html" class="menu-link">插件开发</a></li><li><a href="/zh-cn/advanced/framework.html" class="menu-link">框架开发</a></li><li><a href="/zh-cn/advanced/cluster-client.html" class="menu-link">多进程研发模式增强</a></li><li><a href="/zh-cn/advanced/view-plugin.html" class="menu-link">模板插件开发规范</a></li><li><a href="/zh-cn/style-guide.html" class="menu-link">代码风格指南</a></li></ul></dd><dt id="title-Community" style="cursor: pointer;" class="aside-title">社区<a id="collapse-icon-Community" class="icon opend"></a></dt><dd id=panel-Community><ul><li><a href="/zh-cn/plugins/" class="menu-link">内置插件列表</a></li><li><a href="/zh-cn/contributing.html" class="menu-link">如何贡献</a></li><li><a href="/zh-cn/resource.html" class="menu-link">资源</a></li><li><a href="/zh-cn/faq.html" class="menu-link">常见问题</a></li></ul></dd></dl>
</aside>
<script>
var mobileTrigger = document.getElementById('mobileTrigger');
var mobileAside = document.getElementById('mobileAside');

var expandMenu = function(title) {
  // handle icon
  const collapseIcon = document.getElementById('collapse-icon-' + title);
  if (collapseIcon) {
    collapseIcon.className = 'icon opend';
  }
  // handle panelEle
  const panelEle = document.getElementById('panel-' + title);
  if (panelEle) {
    panelEle.className = '';
  }
}

var collapseMenu = function(title) {
  // handle icon
  const collapseIcon = document.getElementById('collapse-icon-' + title);
  if (collapseIcon) {
    collapseIcon.className = 'icon closed';
  }
  // handle panelEle
  const panelEle = document.getElementById('panel-' + title);
  if (panelEle) {
    panelEle.className = 'aside-panel-hidden';
  }
}

mobileAside.onclick = function(e) {
  const targetId = e.target.id;
  if (targetId && (targetId.indexOf('title-') > -1 || targetId.indexOf('collapse-icon-') > -1)) {
    const title = targetId.replace('title-', '').replace('collapse-icon-', '');
    try { 
      // the the browser may have no localStroage or JSON.parse may throw exception.
      const menuInfo = JSON.parse(window.localStorage.getItem('menuInfo'));
        
      // current menu status
      const curClosed = menuInfo[title] ? menuInfo[title].closed : false; // default false

      // change UI
      curClosed ? expandMenu(title) : collapseMenu(title);

      // save menuInfo to localStorage
      menuInfo[title] = { closed: !curClosed } // opposite
      window.localStorage.setItem('menuInfo', JSON.stringify(menuInfo));
    } catch (e) {}
  }
};

mobileTrigger.onclick = function(e) {
  e.preventDefault();
  if (mobileAside.className.indexOf('mobile-show') === -1) {
    mobileAside.className += ' mobile-show';
  } else {
    mobileAside.className = 'toc';
  }
};

(function() {
  // save data to localStorage because the page will refresh when user change the url.
  let menuInfo;
  try { 
    // the the browser may have no localStroage or JSON.parse may throw exception.
    menuInfo = JSON.parse(window.localStorage.getItem('menuInfo'));
    if (!menuInfo) {
      menuInfo = {};
      window.localStorage.setItem('menuInfo', JSON.stringify(menuInfo));
    }
  } catch (e) {
    menuInfo = {}; // default {}
  }

  for (const title in menuInfo) {
    if (menuInfo[title] && menuInfo[title].closed) { // menu in closed status.
      collapseMenu(title);
    } else {
      expandMenu(title);
    }
  }

  // highlight menu
  const pathname = window.location.pathname;
  const selector = `a[href="${pathname}"].menu-link,a[href="${pathname}index.html"].menu-link`;
  const menuItem = mobileAside.querySelector(selector);
  if (menuItem) { menuItem.className += ' highlight'; }
})();
</script>

</div>

  </div>
</body>
<script src="https://cdn.jsdelivr.net/docsearch.js/2/docsearch.min.js"></script>
<script>
docsearch({
  apiKey: '1561de31a86f79507ea00cdb54ce647c',
  indexName: 'eggjs',
  inputSelector: '#search-query',
});
</script>
<div class="cnzz">
<script src="https://s11.cnzz.com/z_stat.php?id=1261142226&web_id=1261142226" language="JavaScript"></script>
</div>

</html>
